// Copyright (C) Kumo inc. and its affiliates.
// Author: Jeff.li lijippy@163.com
// All rights reserved.
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published
// by the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program.  If not, see <https://www.gnu.org/licenses/>.
//


#include <pollux/substrait/substrait_to_pollux_plan.h>
#include <pollux/substrait/type_utils.h>
#include <pollux/substrait/variant_to_vector_converter.h>
#include <pollux/type/type.h>

namespace kumo::pollux::substrait {
    namespace {
        core::AggregationNode::Step toAggregationStep(
            const ::substrait::AggregateRel &sAgg) {
            if (sAgg.measures().size() == 0) {
                // When only groupings exist, set the phase to be Single.
                return core::AggregationNode::Step::kSingle;
            }

            // Use the first measure to set aggregation phase.
            const auto &firstMeasure = sAgg.measures()[0];
            const auto &aggFunction = firstMeasure.measure();
            switch (aggFunction.phase()) {
                case ::substrait::AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE:
                    return core::AggregationNode::Step::kPartial;
                case ::substrait::AGGREGATION_PHASE_INTERMEDIATE_TO_INTERMEDIATE:
                    return core::AggregationNode::Step::kIntermediate;
                case ::substrait::AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT:
                    return core::AggregationNode::Step::kFinal;
                case ::substrait::AGGREGATION_PHASE_INITIAL_TO_RESULT:
                    return core::AggregationNode::Step::kSingle;
                default:
                    POLLUX_FAIL("Aggregate phase is not supported.");
            }
        }

        core::SortOrder toSortOrder(const ::substrait::SortField &sortField) {
            switch (sortField.direction()) {
                case ::substrait::SortField_SortDirection_SORT_DIRECTION_ASC_NULLS_FIRST:
                    return core::kAscNullsFirst;
                case ::substrait::SortField_SortDirection_SORT_DIRECTION_ASC_NULLS_LAST:
                    return core::kAscNullsLast;
                case ::substrait::SortField_SortDirection_SORT_DIRECTION_DESC_NULLS_FIRST:
                    return core::kDescNullsFirst;
                case ::substrait::SortField_SortDirection_SORT_DIRECTION_DESC_NULLS_LAST:
                    return core::kDescNullsLast;
                default:
                    POLLUX_FAIL("Sort direction is not supported.");
            }
        }

        /// Holds the information required to create
        /// a project node to simulate the emit
        /// behavior in Substrait.
        struct EmitInfo {
            std::vector<core::TypedExprPtr> expressions;
            std::vector<std::string> projectNames;
        };

        /// Helper function to extract the attributes required to create a ProjectNode
        /// used for interpretting Substrait Emit.
        EmitInfo getEmitInfo(
            const ::substrait::RelCommon &relCommon,
            const core::PlanNodePtr &node) {
            const auto &emit = relCommon.emit();
            int emitSize = emit.output_mapping_size();
            EmitInfo emitInfo;
            emitInfo.projectNames.reserve(emitSize);
            emitInfo.expressions.reserve(emitSize);
            const auto &outputType = node->outputType();
            for (int i = 0; i < emitSize; i++) {
                int32_t mapId = emit.output_mapping(i);
                emitInfo.projectNames[i] = outputType->nameOf(mapId);
                emitInfo.expressions[i] = std::make_shared<core::FieldAccessTypedExpr>(
                    outputType->childAt(mapId), outputType->nameOf(mapId));
            }
            return emitInfo;
        }
    } // namespace

    core::PlanNodePtr SubstraitPolluxPlanConverter::processEmit(
        const ::substrait::RelCommon &relCommon,
        const core::PlanNodePtr &noEmitNode) {
        switch (relCommon.emit_kind_case()) {
            case ::substrait::RelCommon::EmitKindCase::kDirect:
                return noEmitNode;
            case ::substrait::RelCommon::EmitKindCase::kEmit: {
                auto emitInfo = getEmitInfo(relCommon, noEmitNode);
                return std::make_shared<core::ProjectNode>(
                    nextPlanNodeId(),
                    std::move(emitInfo.projectNames),
                    std::move(emitInfo.expressions),
                    noEmitNode);
            }
            default:
                POLLUX_FAIL("unrecognized emit kind");
        }
    }

    core::PlanNodePtr SubstraitPolluxPlanConverter::toPolluxPlan(
        const ::substrait::AggregateRel &aggRel) {
        auto childNode = convertSingleInput<::substrait::AggregateRel>(aggRel);
        core::AggregationNode::Step aggStep = toAggregationStep(aggRel);
        const auto &inputType = childNode->outputType();
        std::vector<core::FieldAccessTypedExprPtr> polluxGroupingExprs;

        // Get the grouping expressions.
        for (const auto &grouping: aggRel.groupings()) {
            for (const auto &groupingExpr: grouping.grouping_expressions()) {
                // Pollux's groupings are limited to be Field.
                polluxGroupingExprs.emplace_back(
                    exprConverter_->toPolluxExpr(groupingExpr.selection(), inputType));
            }
        }

        // Parse measures and get the aggregate expressions.
        // Each measure represents one aggregate expression.
        std::vector<core::AggregationNode::Aggregate> aggregates;
        aggregates.reserve(aggRel.measures().size());

        for (const auto &measure: aggRel.measures()) {
            core::FieldAccessTypedExprPtr mask;
            ::substrait::Expression substraitAggMask = measure.filter();
            // Get Aggregation Masks.
            if (measure.has_filter()) {
                if (substraitAggMask.ByteSizeLong() > 0) {
                    mask = std::dynamic_pointer_cast<const core::FieldAccessTypedExpr>(
                        exprConverter_->toPolluxExpr(substraitAggMask, inputType));
                }
            }

            const auto &aggFunction = measure.measure();
            auto funcName = substraitParser_->findPolluxFunction(
                functionMap_, aggFunction.function_reference());

            std::vector<core::TypedExprPtr> aggParams;
            aggParams.reserve(aggFunction.arguments().size());
            for (const auto &arg: aggFunction.arguments()) {
                aggParams.emplace_back(
                    exprConverter_->toPolluxExpr(arg.value(), inputType));
            }
            auto aggPolluxType = substraitParser_->parseType(aggFunction.output_type());
            auto aggExpr = std::make_shared<const core::CallTypedExpr>(
                aggPolluxType, std::move(aggParams), funcName);
            std::vector<TypePtr> rawInputTypes = SubstraitParser::getInputTypes(
                findFunction(aggFunction.function_reference()));
            aggregates.emplace_back(
                core::AggregationNode::Aggregate{aggExpr, rawInputTypes, mask, {}, {}});
        }

        bool ignoreNullKeys = false;
        std::vector<core::FieldAccessTypedExprPtr> preGroupingExprs;

        // Get the output names of Aggregation.
        std::vector<std::string> aggOutNames;
        aggOutNames.reserve(aggRel.measures().size());
        for (int idx = polluxGroupingExprs.size();
             idx < polluxGroupingExprs.size() + aggRel.measures().size();
             idx++) {
            aggOutNames.emplace_back(substraitParser_->makeNodeName(planNodeId_, idx));
        }

        auto aggregationNode = std::make_shared<core::AggregationNode>(
            nextPlanNodeId(),
            aggStep,
            polluxGroupingExprs,
            preGroupingExprs,
            aggOutNames,
            aggregates,
            ignoreNullKeys,
            childNode);

        if (aggRel.has_common()) {
            return processEmit(aggRel.common(), std::move(aggregationNode));
        } else {
            return aggregationNode;
        }
    }

    core::PlanNodePtr SubstraitPolluxPlanConverter::toPolluxPlan(
        const ::substrait::ProjectRel &projectRel) {
        auto childNode = convertSingleInput<::substrait::ProjectRel>(projectRel);

        // Construct Pollux Expressions.
        auto projectExprs = projectRel.expressions();
        std::vector<std::string> projectNames;
        std::vector<core::TypedExprPtr> expressions;
        projectNames.reserve(projectExprs.size());
        expressions.reserve(projectExprs.size());

        const auto &inputType = childNode->outputType();
        int colIdx = 0;
        // Note that Substrait projection adds the project expressions on top of the
        // input to the projection node. Thus we need to add the input columns first
        // and then add the projection expressions.

        // First, adding the project names and expressions from the input to
        // the project node.
        for (uint32_t idx = 0; idx < inputType->size(); idx++) {
            const auto &fieldName = inputType->nameOf(idx);
            projectNames.emplace_back(fieldName);
            expressions.emplace_back(std::make_shared<core::FieldAccessTypedExpr>(
                inputType->childAt(idx), fieldName));
            colIdx += 1;
        }

        // Then, adding project expression related project names and expressions.
        for (const auto &expr: projectExprs) {
            expressions.emplace_back(exprConverter_->toPolluxExpr(expr, inputType));
            projectNames.emplace_back(
                substraitParser_->makeNodeName(planNodeId_, colIdx));
            colIdx += 1;
        }

        if (projectRel.has_common()) {
            auto relCommon = projectRel.common();
            const auto &emit = relCommon.emit();
            int emitSize = emit.output_mapping_size();
            std::vector<std::string> emitProjectNames(emitSize);
            std::vector<core::TypedExprPtr> emitExpressions(emitSize);
            for (int i = 0; i < emitSize; i++) {
                int32_t mapId = emit.output_mapping(i);
                emitProjectNames[i] = projectNames[mapId];
                emitExpressions[i] = expressions[mapId];
            }
            return std::make_shared<core::ProjectNode>(
                nextPlanNodeId(),
                std::move(emitProjectNames),
                std::move(emitExpressions),
                std::move(childNode));
        } else {
            return std::make_shared<core::ProjectNode>(
                nextPlanNodeId(),
                std::move(projectNames),
                std::move(expressions),
                std::move(childNode));
        }
    }

    core::PlanNodePtr SubstraitPolluxPlanConverter::toPolluxPlan(
        const ::substrait::SortRel &sortRel) {
        auto childNode = convertSingleInput<::substrait::SortRel>(sortRel);

        auto [sortingKeys, sortingOrders] =
                processSortField(sortRel.sorts(), childNode->outputType());

        return std::make_shared<core::OrderByNode>(
            nextPlanNodeId(),
            sortingKeys,
            sortingOrders,
            false /*isPartial*/,
            childNode);
    }

    std::pair<
        std::vector<core::FieldAccessTypedExprPtr>,
        std::vector<core::SortOrder> >
    SubstraitPolluxPlanConverter::processSortField(
        const ::google::protobuf::RepeatedPtrField<::substrait::SortField> &
        sortFields,
        const RowTypePtr &inputType) {
        std::vector<core::FieldAccessTypedExprPtr> sortingKeys;
        std::vector<core::SortOrder> sortingOrders;
        sortingKeys.reserve(sortFields.size());
        sortingOrders.reserve(sortFields.size());

        for (const auto &sort: sortFields) {
            sortingOrders.emplace_back(toSortOrder(sort));

            if (sort.has_expr()) {
                auto expression = exprConverter_->toPolluxExpr(sort.expr(), inputType);
                auto fieldExpr =
                        std::dynamic_pointer_cast<const core::FieldAccessTypedExpr>(
                            expression);
                POLLUX_CHECK_NOT_NULL(
                    fieldExpr, " the sorting key in Sort Operator only support field");
                sortingKeys.emplace_back(fieldExpr);
            }
        }
        return {sortingKeys, sortingOrders};
    }

    core::PlanNodePtr SubstraitPolluxPlanConverter::toPolluxPlan(
        const ::substrait::FilterRel &filterRel) {
        auto childNode = convertSingleInput<::substrait::FilterRel>(filterRel);

        auto filterNode = std::make_shared<core::FilterNode>(
            nextPlanNodeId(),
            exprConverter_->toPolluxExpr(
                filterRel.condition(), childNode->outputType()),
            childNode);

        if (filterRel.has_common()) {
            return processEmit(filterRel.common(), std::move(filterNode));
        } else {
            return filterNode;
        }
    }

    core::PlanNodePtr SubstraitPolluxPlanConverter::toPolluxPlan(
        const ::substrait::FetchRel &fetchRel) {
        core::PlanNodePtr childNode;
        // Check the input of fetchRel, if it's sortRel, convert them into
        // topNNode. otherwise, to limitNode.
        ::substrait::SortRel sortRel;
        bool topNFlag;
        if (fetchRel.has_input()) {
            topNFlag = fetchRel.input().has_sort();
            if (topNFlag) {
                sortRel = fetchRel.input().sort();
                childNode = toPolluxPlan(sortRel.input());
            } else {
                childNode = toPolluxPlan(fetchRel.input());
            }
        } else {
            POLLUX_FAIL("Child Rel is expected in FetchRel.");
        }

        if (topNFlag) {
            auto [sortingKeys, sortingOrders] =
                    processSortField(sortRel.sorts(), childNode->outputType());

            POLLUX_CHECK_EQ(fetchRel.offset(), 0);

            return std::make_shared<core::TopNNode>(
                nextPlanNodeId(),
                sortingKeys,
                sortingOrders,
                (int32_t) fetchRel.count(),
                false /*isPartial*/,
                childNode);
        } else {
            return std::make_shared<core::LimitNode>(
                nextPlanNodeId(),
                static_cast<int32_t>(fetchRel.offset()),
                static_cast<int32_t>(fetchRel.count()),
                false /*isPartial*/,
                childNode);
        }
    }

    core::PlanNodePtr SubstraitPolluxPlanConverter::toPolluxPlan(
        const ::substrait::ReadRel &readRel,
        std::shared_ptr<SplitInfo> &splitInfo) {
        // emit is not allowed in TableScanNode and ValuesNode related
        // outputs
        if (readRel.has_common()) {
            POLLUX_USER_CHECK(
                !readRel.common().has_emit(),
                "Emit not supported for ValuesNode and TableScanNode related Substrait plans.");
        }
        // Get output names and types.
        std::vector<std::string> colNameList;
        std::vector<TypePtr> polluxTypeList;
        if (readRel.has_base_schema()) {
            const auto &baseSchema = readRel.base_schema();
            colNameList.reserve(baseSchema.names().size());
            for (const auto &name: baseSchema.names()) {
                colNameList.emplace_back(name);
            }
            polluxTypeList = substraitParser_->parseNamedStruct(baseSchema);
        }

        // Parse local files
        if (readRel.has_local_files()) {
            using SubstraitFileFormatCase =
                    ::substrait::ReadRel_LocalFiles_FileOrFiles::FileFormatCase;
            const auto &fileList = readRel.local_files().items();
            splitInfo->paths.reserve(fileList.size());
            splitInfo->starts.reserve(fileList.size());
            splitInfo->lengths.reserve(fileList.size());
            for (const auto &file: fileList) {
                // Expect all files to share the same index.
                splitInfo->partitionIndex = file.partition_index();
                splitInfo->paths.emplace_back(file.uri_file());
                splitInfo->starts.emplace_back(file.start());
                splitInfo->lengths.emplace_back(file.length());
                switch (file.file_format_case()) {
                    case SubstraitFileFormatCase::kOrc:
                        splitInfo->format = dwio::common::FileFormat::DWRF;
                        break;
                    case SubstraitFileFormatCase::kParquet:
                        splitInfo->format = dwio::common::FileFormat::PARQUET;
                        break;
                    default:
                        splitInfo->format = dwio::common::FileFormat::UNKNOWN;
                }
            }
        }

        // Do not hard-code connector ID and allow for connectors other than Hive.
        static const std::string kHiveConnectorId = "test-hive";

        // Pollux requires Filter Pushdown must being enabled.
        bool filterPushdownEnabled = true;
        std::shared_ptr<connector::hive::HiveTableHandle> tableHandle;
        if (!readRel.has_filter()) {
            tableHandle = std::make_shared<connector::hive::HiveTableHandle>(
                kHiveConnectorId,
                "hive_table",
                filterPushdownEnabled,
                common::SubfieldFilters{},
                nullptr,
                nullptr);
        } else {
            common::SubfieldFilters filters =
                    toPolluxFilter(colNameList, polluxTypeList, readRel.filter());
            tableHandle = std::make_shared<connector::hive::HiveTableHandle>(
                kHiveConnectorId,
                "hive_table",
                filterPushdownEnabled,
                std::move(filters),
                nullptr,
                nullptr);
        }

        // Get assignments and out names.
        std::vector<std::string> outNames;
        outNames.reserve(colNameList.size());
        std::unordered_map<std::string, std::shared_ptr<connector::ColumnHandle> >
                assignments;
        for (int idx = 0; idx < colNameList.size(); idx++) {
            auto outName = substraitParser_->makeNodeName(planNodeId_, idx);
            assignments[outName] = std::make_shared<connector::hive::HiveColumnHandle>(
                colNameList[idx],
                connector::hive::HiveColumnHandle::ColumnType::kRegular,
                polluxTypeList[idx],
                polluxTypeList[idx]);
            outNames.emplace_back(outName);
        }
        auto outputType = ROW(std::move(outNames), std::move(polluxTypeList));

        if (readRel.has_virtual_table()) {
            return toPolluxPlan(readRel, outputType);
        } else {
            return std::make_shared<core::TableScanNode>(
                nextPlanNodeId(),
                std::move(outputType),
                std::move(tableHandle),
                std::move(assignments));
        }
    }

    core::PlanNodePtr SubstraitPolluxPlanConverter::toPolluxPlan(
        const ::substrait::ReadRel &readRel,
        const RowTypePtr &type) {
        ::substrait::ReadRel_VirtualTable readVirtualTable = readRel.virtual_table();
        int64_t numVectors = readVirtualTable.values_size();
        int64_t numColumns = type->size();
        int64_t valueFieldNums =
                readVirtualTable.values(numVectors - 1).fields_size();
        std::vector<RowVectorPtr> vectors;
        vectors.reserve(numVectors);

        int64_t batchSize;
        // For the empty vectors, eg,vectors = make_row_vector(ROW({}, {}), 1).
        if (numColumns == 0) {
            batchSize = 1;
        } else {
            batchSize = valueFieldNums / numColumns;
        }

        for (int64_t index = 0; index < numVectors; ++index) {
            std::vector<VectorPtr> children;
            ::substrait::Expression_Literal_Struct rowValue =
                    readRel.virtual_table().values(index);
            auto fieldSize = rowValue.fields_size();
            POLLUX_CHECK_EQ(fieldSize, batchSize * numColumns);

            for (int64_t col = 0; col < numColumns; ++col) {
                const TypePtr &outputChildType = type->childAt(col);
                std::vector<variant> batchChild;
                batchChild.reserve(batchSize);
                for (int64_t batchId = 0; batchId < batchSize; batchId++) {
                    // each value in the batch
                    auto fieldIdx = col * batchSize + batchId;
                    ::substrait::Expression_Literal field = rowValue.fields(fieldIdx);

                    auto expr = exprConverter_->toPolluxExpr(field);
                    if (auto constantExpr =
                            std::dynamic_pointer_cast<const core::ConstantTypedExpr>(
                                expr)) {
                        if (!constantExpr->hasValueVector()) {
                            batchChild.emplace_back(constantExpr->value());
                        } else {
                            POLLUX_UNSUPPORTED(
                                "Values node with complex type values is not supported yet");
                        }
                    } else {
                        POLLUX_FAIL("Expected constant expression");
                    }
                }
                children.emplace_back(
                    setVectorFromVariants(outputChildType, batchChild, pool_));
            }

            vectors.emplace_back(
                std::make_shared<RowVector>(pool_, type, nullptr, batchSize, children));
        }

        return std::make_shared<core::ValuesNode>(
            nextPlanNodeId(), std::move(vectors));
    }

    core::PlanNodePtr SubstraitPolluxPlanConverter::toPolluxPlan(
        const ::substrait::Rel &rel) {
        if (rel.has_aggregate()) {
            return toPolluxPlan(rel.aggregate());
        }
        if (rel.has_project()) {
            return toPolluxPlan(rel.project());
        }
        if (rel.has_filter()) {
            return toPolluxPlan(rel.filter());
        }
        if (rel.has_read()) {
            auto splitInfo = std::make_shared<SplitInfo>();

            auto planNode = toPolluxPlan(rel.read(), splitInfo);
            splitInfoMap_[planNode->id()] = splitInfo;
            return planNode;
        }
        if (rel.has_fetch()) {
            return toPolluxPlan(rel.fetch());
        }
        if (rel.has_sort()) {
            return toPolluxPlan(rel.sort());
        }
        POLLUX_NYI("Substrait conversion not supported for Rel.");
    }

    core::PlanNodePtr SubstraitPolluxPlanConverter::toPolluxPlan(
        const ::substrait::RelRoot &root) {
        // TODO: Use the names as the output names for the whole computing.
        if (root.has_input()) {
            const auto &rel = root.input();
            return toPolluxPlan(rel);
        }
        POLLUX_FAIL("Input is expected in RelRoot.");
    }

    core::PlanNodePtr SubstraitPolluxPlanConverter::toPolluxPlan(
        const ::substrait::Plan &substraitPlan) {
        POLLUX_CHECK(
            checkTypeExtension(substraitPlan),
            "The type extension only have unknown type.");
        // Construct the function map based on the Substrait representation.
        constructFunctionMap(substraitPlan);

        // Construct the expression converter.
        exprConverter_ =
                std::make_shared<SubstraitPolluxExprConverter>(pool_, functionMap_);

        // In fact, only one RelRoot or Rel is expected here.
        POLLUX_CHECK_EQ(substraitPlan.relations_size(), 1);
        const auto &rel = substraitPlan.relations(0);
        if (rel.has_root()) {
            return toPolluxPlan(rel.root());
        }
        if (rel.has_rel()) {
            return toPolluxPlan(rel.rel());
        }

        POLLUX_FAIL("RelRoot or Rel is expected in Plan.");
    }

    std::string SubstraitPolluxPlanConverter::nextPlanNodeId() {
        auto id = fmt::format("{}", planNodeId_);
        planNodeId_++;
        return id;
    }

    // This class contains the needed infos for Filter Pushdown.
    // TODO: Support different types here.
    class FilterInfo {
    public:
        // Used to set the left bound.
        void setLeft(double left, bool isExclusive) {
            left_ = left;
            leftExclusive_ = isExclusive;
            if (!isInitialized_) {
                isInitialized_ = true;
            }
        }

        // Used to set the right bound.
        void setRight(double right, bool isExclusive) {
            right_ = right;
            rightExclusive_ = isExclusive;
            if (!isInitialized_) {
                isInitialized_ = true;
            }
        }

        // Will fordis Null value if called once.
        void forbidsNull() {
            nullAllowed_ = false;
            if (!isInitialized_) {
                isInitialized_ = true;
            }
        }

        // Return the initialization status.
        bool isInitialized() {
            return isInitialized_ ? true : false;
        }

        // The left bound.
        std::optional<double> left_ = std::nullopt;
        // The right bound.
        std::optional<double> right_ = std::nullopt;
        // The Null allowing.
        bool nullAllowed_ = true;
        // If true, left bound will be exclusive.
        bool leftExclusive_ = false;
        // If true, right bound will be exclusive.
        bool rightExclusive_ = false;

    private:
        bool isInitialized_ = false;
    };

    common::SubfieldFilters SubstraitPolluxPlanConverter::toPolluxFilter(
        const std::vector<std::string> &inputNameList,
        const std::vector<TypePtr> &inputTypeList,
        const ::substrait::Expression &substraitFilter) {
        common::SubfieldFilters filters;
        // A map betweesn the column index and the FilterInfo for that column.
        std::unordered_map<int, std::shared_ptr<FilterInfo> > colInfoMap;
        for (int idx = 0; idx < inputNameList.size(); idx++) {
            colInfoMap[idx] = std::make_shared<FilterInfo>();
        }

        std::vector<::substrait::Expression_ScalarFunction> scalarFunctions;
        flattenConditions(substraitFilter, scalarFunctions);
        // Construct the FilterInfo for the related column.
        for (const auto &scalarFunction: scalarFunctions) {
            auto filterNameSpec = substraitParser_->findFunctionSpec(
                functionMap_, scalarFunction.function_reference());
            auto filterName = getNameBeforeDelimiter(filterNameSpec, ":");
            int32_t colIdx;
            // TODO: Add different types' support here.
            double val;
            for (auto &arg: scalarFunction.arguments()) {
                auto argExpr = arg.value();
                auto typeCase = argExpr.rex_type_case();
                switch (typeCase) {
                    case ::substrait::Expression::RexTypeCase::kSelection: {
                        auto sel = argExpr.selection();
                        // TODO: Only direct reference is considered here.
                        auto dRef = sel.direct_reference();
                        colIdx = substraitParser_->parseReferenceSegment(dRef);
                        break;
                    }
                    case ::substrait::Expression::RexTypeCase::kLiteral: {
                        auto sLit = argExpr.literal();
                        // TODO: Only double is considered here.
                        val = sLit.fp64();
                        break;
                    }
                    default:
                        POLLUX_NYI(
                            "Substrait conversion not supported for arg type '{}'", typeCase);
                }
            }
            if (filterName == "is_not_null") {
                colInfoMap[colIdx]->forbidsNull();
            } else if (filterName == "gte") {
                colInfoMap[colIdx]->setLeft(val, false);
            } else if (filterName == "gt") {
                colInfoMap[colIdx]->setLeft(val, true);
            } else if (filterName == "lte") {
                colInfoMap[colIdx]->setRight(val, false);
            } else if (filterName == "lt") {
                colInfoMap[colIdx]->setRight(val, true);
            } else {
                POLLUX_NYI(
                    "Substrait conversion not supported for filter name '{}'",
                    filterName);
            }
        }

        // Construct the Filters.
        for (int idx = 0; idx < inputNameList.size(); idx++) {
            auto filterInfo = colInfoMap[idx];
            double leftBound;
            double rightBound;
            bool leftUnbounded = true;
            bool rightUnbounded = true;
            bool leftExclusive = false;
            bool rightExclusive = false;
            if (filterInfo->isInitialized()) {
                if (filterInfo->left_) {
                    leftUnbounded = false;
                    leftBound = filterInfo->left_.value();
                    leftExclusive = filterInfo->leftExclusive_;
                }
                if (filterInfo->right_) {
                    rightUnbounded = false;
                    rightBound = filterInfo->right_.value();
                    rightExclusive = filterInfo->rightExclusive_;
                }
                bool nullAllowed = filterInfo->nullAllowed_;
                filters[common::Subfield(inputNameList[idx])] =
                        std::make_unique<common::DoubleRange>(
                            leftBound,
                            leftUnbounded,
                            leftExclusive,
                            rightBound,
                            rightUnbounded,
                            rightExclusive,
                            nullAllowed);
            }
        }
        return filters;
    }

    void SubstraitPolluxPlanConverter::flattenConditions(
        const ::substrait::Expression &substraitFilter,
        std::vector<::substrait::Expression_ScalarFunction> &scalarFunctions) {
        auto typeCase = substraitFilter.rex_type_case();
        switch (typeCase) {
            case ::substrait::Expression::RexTypeCase::kScalarFunction: {
                auto sFunc = substraitFilter.scalar_function();
                auto filterNameSpec = substraitParser_->findFunctionSpec(
                    functionMap_, sFunc.function_reference());
                // TODO: Only and relation is supported here.
                if (getNameBeforeDelimiter(filterNameSpec, ":") == "and") {
                    for (const auto &sCondition: sFunc.arguments()) {
                        flattenConditions(sCondition.value(), scalarFunctions);
                    }
                } else {
                    scalarFunctions.emplace_back(sFunc);
                }
                break;
            }
            default:
                POLLUX_NYI("GetFlatConditions not supported for type '{}'", typeCase);
        }
    }

    void SubstraitPolluxPlanConverter::constructFunctionMap(
        const ::substrait::Plan &substraitPlan) {
        // Construct the function map based on the Substrait representation.
        for (const auto &sExtension: substraitPlan.extensions()) {
            if (!sExtension.has_extension_function()) {
                continue;
            }
            const auto &sFmap = sExtension.extension_function();
            auto id = sFmap.function_anchor();
            auto name = sFmap.name();
            functionMap_[id] = name;
        }
    }

    bool SubstraitPolluxPlanConverter::checkTypeExtension(
        const ::substrait::Plan &substraitPlan) {
        for (const auto &sExtension: substraitPlan.extensions()) {
            if (!sExtension.has_extension_type()) {
                continue;
            }

            // Only support UNKNOWN type in UserDefined type extension.
            if (sExtension.extension_type().name() != "UNKNOWN") {
                return false;
            }
        }
        return true;
    }

    const std::string &SubstraitPolluxPlanConverter::findFunction(
        uint64_t id) const {
        return substraitParser_->findFunctionSpec(functionMap_, id);
    }
} // namespace kumo::pollux::substrait
